5.2 channel的底层实现
通道在并发编程中是比较常用的一个内容,在我们进行实际业务开发时会大量使用到。
本节我们将针对通道的底层实现原理进行探讨学习,包括底层的结构、处理机制等内容。
本节代码存放目录为 lesson14
通道的基本结构
我们可以通过源码包 runtime/chan.go
找到关于通道的源码,其中详细定义了通道的结构以及相关的实现。
结构如下所示:
type hchan struct {
qcount uint // 通道中的元素个数
dataqsiz uint // 环形队列的大小(缓冲区大小)
buf unsafe.Pointer // 指向底层缓冲区的指针
elemsize uint16 // 每个元素的大小
closed uint32 // 标识通道是否已关闭
sendx uint // 下一个发送元素的索引
recvx uint // 下一个接收元素的索引
recvq waitq // 接收者等待队列
sendq waitq // 发送者等待队列
lock mutex // 互斥锁,保护通道的并发操作
}
在上面的结构中,我们可以主要关注bug
、recvq
、sendq
这三个内容。
buf
这是指向底层缓冲区的指针,主要用于有缓冲通道,无缓冲通道是用不到这个东西的。
也就是说,对于有缓冲通道,数据发送后就会首先进入到buf
中,当读取数据时也是从buf
中读取。
例如我们做如下声明:
ch := make(chan int, 3)
那么在底层buf
的实际结构可能是这样的:
+----+----+----+
| 1 | 2 | | <- 这是缓冲区
+----+----+----+
sendx
这时候可能指向第三个空位置,表示下一个数据将存放在这里。
recvx
可能指向第一个位置,表示下一个读取操作将从这里获取数据。
当sendx
或recvx
到达缓冲区的末尾时,它们会环绕回到数组的起始位置 (这就是环形队列的特性)。
recvq 与 sendq
这两者都是一个队列,准确的说是一个接收任务队列与一个发送任务队列。
当一个Goroutine
尝试从通道接收数据,但通道(无缓冲或缓冲区为空)没有可用的数据时,该Goroutine
会被阻塞,并被放入recvq
队列中。
一旦通道中有数据可用,recvq
中最早阻塞的Goroutine
会被唤醒,从通道中接收数据并继续执行。
当一个Goroutine
尝试向通道发送数据,但通道(无缓冲或缓冲区已满)没有空闲空间时,该Goroutine
会被阻塞,并被放入sendq
队列中。
一旦通道有空闲的空间(例如,接收操作消耗了一个缓冲区数据),sendq
中最早阻塞的Goroutine
会被唤醒,继续发送数据。
我们可以通过下面的例子来查看:
noBufChan := make(chan int)
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("enter goroutine receive 1")
for ch := range noBufChan {
fmt.Printf("noBufChan receive 1 -> %d\n", ch)
}
}()
time.Sleep(time.Duration(1) * time.Second)
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("enter goroutine receive 2")
for ch := range noBufChan {
fmt.Printf("noBufChan receive 2 -> %d\n", ch)
}
}()
time.Sleep(time.Duration(1) * time.Second)
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("enter goroutine send 1")
time.Sleep(time.Duration(3) * time.Second)
for i := 0; i < 100; i++ {
noBufChan <- i
fmt.Printf("noBufChan send -> %d\n", i)
time.Sleep(time.Duration(1) * time.Second)
}
}()
wg.Wait()
结果输出如下所示:
enter goroutine receive 1
enter goroutine receive 2
enter goroutine send 1
noBufChan receive 1 -> 0
noBufChan send -> 0
noBufChan send -> 1
noBufChan receive 2 -> 1
noBufChan send -> 2
noBufChan receive 1 -> 2
在上面的代码中,我们先使用receive 1
、receive 2
读取数据,这时候通道里面是没有数据的,所以会阻塞。
等到send
开始执行时,读取协程开始恢复接收,读取到发送的数据。
通道关闭机制及性能优化
关闭机制
通道的关闭会经过一系列的资源释放及状态标记,我们可以通过源码包 runtime/chan.go
中的closechan
函数查看具体实现。
主要分为以下几个步骤:
检查通道是否为
nil
:if c == nil { panic(plainError("close of nil channel")) }
首先检查通道是否为
nil
,如果是nil
通道,调用panic
抛出错误。获取通道的锁:
lock(&c.lock)
Go
语言的通道是线程安全的,使用锁来确保多个Goroutine
同时操作通道时不会发生竞态条件。检查通道是否已经关闭:
if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
检查通道是否已经关闭。如果通道已经关闭,再次关闭时会触发
panic
。竞态条件检测:
if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) racerelease(c.raceaddr()) }
这里的
raceenabled
是竞态条件检测相关的逻辑。racewritepc
和racerelease
用于检测并处理竞态条件,确保在竞态检测开启的情况下能够捕捉到相关问题。标记通道已关闭:
c.closed = 1
将通道的
closed
字段设为1
,表示通道已经关闭。处理接收者队列:
var glist gList // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) }
遍历通道的接收者队列
recvq
,将所有等待接收数据的Goroutine
释放。对于每一个在接收队列中的
Goroutine
,如果它的elem
字段非nil
,会将其清空,并将Goroutine
标记为未成功接收数据。最后将这些
Goroutine
加入到glist
中。处理发送者队列:
// release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) }
类似地,遍历发送者队列
sendq
,将所有等待发送数据的Goroutine
释放,并将其加入到glist
中。由于通道关闭后不能再发送数据,所有在发送队列中的
Goroutine
将会panic
。释放锁:
unlock(&c.lock)
完成通道关闭操作后,释放通道的锁。
唤醒所有阻塞的
Goroutine
:// Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }
最后,唤醒
glist
中所有的Goroutine
,即唤醒那些被阻塞在接收或发送操作上的Goroutine
,这些Goroutine
会检测通道已经关闭,并采取相应的操作(如返回零值或panic
)。
性能优化与通道使用建议
上文中我们已经对通道的结构及原理进行了一个大致的了解,在日常使用的时候我们也需要注意一些内容。
使用缓冲通道提高性能:在高并发场景下,适当的缓冲区大小可以减少
Goroutine
的阻塞次数,从而提高性能。避免滥用无缓冲通道:无缓冲通道会导致更多的阻塞和上下文切换,可能导致性能问题,尤其是在大量数据传输时,一般来说无缓冲通道我们都是用在一些流程控制上。
小结
本节我们讲解了通道的底层结构以及使用建议。
关于本节总结如下:
底层通过
buf
实现缓冲通道通过发送队列、接收队列实现堵塞
关闭通道时释放一系列资源、唤醒读取堵塞队列